HBase BulkLoad Spark实现过程及一些问题

您所在的位置:网站首页 bulk load HBase BulkLoad Spark实现过程及一些问题

HBase BulkLoad Spark实现过程及一些问题

#HBase BulkLoad Spark实现过程及一些问题| 来源: 网络整理| 查看: 265

文章目录 HBase储存原理BulkLoadMaven重复依赖maven对于重复依赖的处理方式解决方案 Spark导入HBase classpath

HBase储存原理

HBase存储数据其底层使用的是HDFS来作为存储介质,HBase的每一张表对应的HDFS目录上的一个文件夹,文件夹名以HBase表进行命名(如果没有使用命名空间,则默认在default目录下),在表文件夹下存放在若干个Region命名的文件夹,Region文件夹中的每个列簇也是用文件夹进行存储的,每个列簇中存储就是实际的数据,以HFile的形式存在。路径格式如下:

/hbase/data/default//// BulkLoad

批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过程中,如果数据量过大,可能耗时会比较严重或者占用HBase集群资源较多。

BulkLoad :按照HBase存储数据按照HFile格式存储在HDFS的原理,使用MapReduce直接生成HFile格式的数据文件,然后在通过RegionServer将HFile数据文件移动到相应的Region上去。

public class BulkLoad { public static void main(String[] args) throws Exception { if(args.length !=2){ System.out.println("请输入文件目录和表名"); return; } String filesDir = args[0]; SparkSession spark = SparkSession.builder().appName("bulkLoad").getOrCreate(); JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext()); JavaPairRDD unresolvedRDD = javaSparkContext.wholeTextFiles(filesDir); JavaPairRDD pointCloudRDD = unresolvedRDD.mapToPair(tuple->{ String filePath = tuple._1; String fileNameWithSuffix = filePath.substring(filePath.lastIndexOf(File.separator)+1); return new Tuple2(fileNameWithSuffix, tuple._2.getBytes(Charsets.ISO_8859_1)); }); JavaPairRDD hFileRDD = pointCloudRDD.mapToPair(tuple->{ String fileNameWithSuffix = tuple._1; String fileName = fileNameWithSuffix.split("\\.")[0]; String suffix = fileNameWithSuffix.split("\\.")[1]; byte[] rowKey = Bytes.toBytes(fileName); ImmutableBytesWritable immutableRowKey = new ImmutableBytesWritable(rowKey); byte[] columnFamily = Bytes.toBytes("data"); byte[] columnQualifier = Bytes.toBytes(suffix); KeyValue keyValue = new KeyValue(rowKey, columnFamily, columnQualifier, tuple._2); return new Tuple2(immutableRowKey, keyValue); }); Configuration hConf = HBaseConfiguration.create(); String tableName = args[1]; HBaseUtils.createTable(tableName, new String[]{"data"}); hConf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName); TableName hTableName = TableName.valueOf(tableName); Connection connection = ConnectionFactory.createConnection(hConf); Table table = connection.getTable(hTableName); RegionLocator regionLocator = connection.getRegionLocator(hTableName); String hFileOutPut = "hdfs://master:9000/pzx/hFile"; hFileRDD.saveAsNewAPIHadoopFile(hFileOutPut, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, hConf); LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(hConf); bulkLoader.doBulkLoad(new Path(hFileOutPut), connection.getAdmin(), table, regionLocator); } }

主要过程:

将RDD转换为JavaPairRDD形式,ImmutableBytesWritable为行键,KeyValue为储存的映射数据。调用RDD的saveAsNewAPIHadoopFile方法,设置结果输出路径,输出格式等参数。将RDD转换为HFile,储存在文件系统中。获取HBase指定表的RegionLocator。构造LoadIncrementalHFiles对象, 将HFile输出路径,表RegionLocator等参数传入。调用LoadIncrementalHFiles对象的doBulkLoad方法,RegionServer会到HFile输出路径获取自己所需的HFile文件。 Maven重复依赖

在IDEA中进行代码调试时,需要引入spark和hbase相关的依赖。

com.pzx myHadoopUtils 1.0-SNAPSHOT org.apache.hbase hbase-client 1.4.9 provided org.apache.spark spark-core_2.12 2.4.4 provided org.apache.spark spark-sql_2.12 2.4.4 provided org.apache.hbase hbase-server 1.4.9 provided

但是如果直接导入依赖后运行会报错: Exception in thread “main” java.lang.NoSuchMethodError: io.netty.buffer.PooledByteBufAllocator.metric()Lio/netty/buffer/PooledByteBufAllocatorMetric;

这是由于hbase和spark中的netty-all依赖不同的版本的冲突。 在这里插入图片描述

由IDEA的插件 Maven Helper可以查看上图。或者通过mvn dependency:tree查看依赖关系。

maven对于重复依赖的处理方式

maven只会挑选重复依赖的一个版本进行导入。

dependencyManagement标签中如果定义了重复依赖的版本,则导入此版本。如果dependencyManagement中没有定义重复依赖的版本,则利用最短路径原则: 比如有如下两个依赖关系: A -> B -> C -> D(V1) F -> G -> D(V2) 这个时候项目中就出现了两个版本的D,这时maven会采用最短路径原则,选择V2版本的D,因为V1版本的D是由A包间接依赖的,整个依赖路径长度为3,而V2版本的D是由F包间接依赖的,整个依赖路径长度为2。如果重复依赖的路径长度是相同的,则使用声明优先原则。比如: A -> B -> D(V1) F -> G -> D(V2) 这个时候因为两个版本的D的依赖路径都是一样长,最短路径原则就失效了。这个时候Maven的解决方案是:按照依赖包在pom.xml中声明的先后顺序,优先选择先声明的包。

再看上面的pom.xml导入的依赖,没有定义dependencyManagement标签。所以使用最短路径原则。对于netty-all依赖,hbase-client和spark-core的依赖路径长度都是1,所以使用声明优先原则,所以使用hbase-client中定义的netty-all依赖版本4.1.8。(上图中显示的spark-core依赖的nettt-all的版本是错误的,应该是4.1.17)。

由于最终导入了hbase-client依赖的4.1.8版本的netty-all,所以spark-core中的代码无法使用4.1.17版本中的函数,最终导致报错,找不到函数。

解决方案

在pom.xml将spark-core声明在hbase-client前面。此时依赖关系变为: 在这里插入图片描述 可以看出maven最终导入的是spark-core所依赖的netty-all的4.1.17版本。

使用标签,将hbase-client中的netty-all排除。此时依赖关系变为: 在这里插入图片描述

定义dependencyManagement标签,声明netty-all版本。

io.netty netty-all 4.1.18.Final

此时依赖关系变为: 在这里插入图片描述

Spark导入HBase classpath

上述maven依赖是在idea中开发时出现的问题,当开发完成之后需要将代码打包成jar,上传到集群中进行执行。对于spark、hbase相关的依赖jar包在打包时都不需要加入,因为集群中安装的spark和hbase目录中已经含有这些jar。如果再将其打包,会导致生成的jar包很大,含有大量冗余的class文件。

对于spark依赖的jar,使用spark-submit命令提交任务时会自动将spark目录下jars目录中的所有jar包加入classpath,但是并不包含hbase依赖的jar。所以需要手动引入hbase的依赖。

使用spark.driver.extraClassPath和spark.executor.extraClassPath可以为Spark任务的driver进程和executor进程设置额外的classpath。但是并不能直接设置$HBASE_HOME/lib/*,这是因为Hbase依赖的jar中包含netty-all.jar,其还是会和spark依赖的netty-all产生冲突。

我的处理方法,是在spark目录下新建一个文件夹hbase-jars(注意:集群上的所有节点必须进行同样的设置)。 cp $HBASE_HOME/lib/hbase*.jar $SPARK_HOME/hbase-jars cp $HBASE_HOME/lib/metrics-core*.jar $SPARK_HOME/hbase-jars

然后在spark-submit提交jar时,加入配置:

--conf spark.driver.extraClassPath="/usr/local/spark-2.4.4/hbase-jars/*" --conf spark.executor.extraClassPath="/usr/local/spark-2.4.4/hbase-jars/*"

设置spark.driver.extraClassPath和spark.executor.extraClassPath适合于引入大量依赖jar包的情况。对于只需要引入几个jar的情况,还可以使用-jars选项或者SparkContext.addJar(…)方法。

两者的区别在于,前者要求集群中所有节点必须都在设置的extreClassPath路径中含有所需的jar包。后者只要求在提交的节点上含有jar,集群中其他节点可以使用http get方式,通过网络请求的方式获取所需的jar包(只是一种情况,还有其他情况)。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3